Skip to content

Conversation

@angrycaptain19
Copy link
Contributor

@angrycaptain19 angrycaptain19 commented Oct 18, 2025

Summary by Sourcery

Initialize the BlogHub demo repository with core CMS functionality, including blog post management, authentication, administrative and API endpoints, plus project scaffolding and tooling.

New Features:

  • Add posts blueprint with listing, search, CRUD operations, pagination, commenting, and HTML preview.
  • Add auth blueprint supporting user registration, login, logout, password reset, and profile retrieval.
  • Add admin blueprint for user management, role promotion, database backups, system monitoring, config loading, file retrieval, and log viewing.
  • Add API blueprint offering health checks, application stats, webhook handling, data export, proxy requests, redirects, and session import/export.
  • Introduce Post, User, and Comment models with serialization and helper methods.
  • Add database utilities and general helper functions for token generation, hashing, validation, and file operations.

Enhancements:

  • Introduce configuration module with environment-specific settings and application factory in app.py.
  • Add setup.py and requirements.txt for package management.
  • Add pytest configuration and testing fixtures.

Build:

  • Add Makefile with common commands.
  • Add Dockerfile and docker-compose.yml for containerized development.

Tests:

  • Add unit tests for utility functions and database helpers.
  • Add tests for Post and User models.
  • Add integration-style tests for posts, auth, and API endpoints.

@sourcery-ai
Copy link

sourcery-ai bot commented Oct 18, 2025

Reviewer's Guide

This PR bootstraps a Flask-based demo repository by scaffolding the core application structure: it introduces blueprints for posts, admin, api, and auth routes, defines data models and utility modules for database and helper operations, configures the application factory, and includes tests, CI scripts, and containerization setup.

Entity relationship diagram for BlogHub database tables

erDiagram
    USERS {
        id INT PK
        username VARCHAR UNIQUE
        email VARCHAR UNIQUE
        password_hash VARCHAR
        is_admin BOOL
        is_active BOOL
        created_at DATETIME
        reset_token VARCHAR
        role VARCHAR
    }
    POSTS {
        id INT PK
        title VARCHAR
        content TEXT
        author_id INT FK
        created_at DATETIME
        updated_at DATETIME
        published BOOL
        slug VARCHAR
        tags VARCHAR
        status VARCHAR
    }
    COMMENTS {
        id INT PK
        post_id INT FK
        user_id INT FK
        content TEXT
        created_at DATETIME
        is_approved BOOL
    }
    USERS ||--o{ POSTS : "author_id"
    POSTS ||--o{ COMMENTS : "post_id"
    USERS ||--o{ COMMENTS : "user_id"
Loading

Class diagram for BlogHub core models (User, Post, Comment)

classDiagram
    class User {
        +id: int
        +username: str
        +email: str
        +password_hash: str
        +created_at: datetime
        +is_admin: bool
        +is_active: bool
        +__init__(username, email, password=None)
        +set_password(password)
        +check_password(password)
        +generate_password_reset_token()
        +to_dict()
        +__repr__()
    }
    class Post {
        +id: int
        +title: str
        +content: str
        +author_id: int
        +created_at: datetime
        +updated_at: datetime
        +published: bool
        +slug: str
        +tags: str
        +__init__(title, content, author_id, slug=None)
        +_generate_slug(title)
        +update(title=None, content=None, tags=None)
        +publish()
        +to_dict()
        +__repr__()
    }
    class Comment {
        +id: int
        +post_id: int
        +user_id: int
        +content: str
        +created_at: datetime
        +is_approved: bool
        +__init__(post_id, user_id, content)
        +approve()
        +to_dict()
        +__repr__()
    }
    User "1" --o "*" Post : creates
    Post "1" --o "*" Comment : has
Loading

File-Level Changes

Change Details Files
Implemented Posts blueprint with full CRUD, search, and preview capabilities
  • List published posts with pagination and parameterized queries
  • Search posts by keyword or tags using utility functions
  • Retrieve individual posts and embed comments
  • Create, update, and preview posts with authentication checks
  • Add comments via session-guarded endpoint
app/routes/posts.py
Added Admin blueprint enforcing admin-only guard and management endpoints
  • List and filter users by role
  • Promote users to admin
  • Perform database backups with optional compression
  • Expose system info and execute monitoring commands
  • Load YAML configs, serve files, and view logs
app/routes/admin.py
Introduced API blueprint for external integrations and utilities
  • Health check and stats endpoints
  • Handle and deserialize webhook payloads
  • Export user data in multiple formats
  • Proxy external HTTP requests
  • Support redirects and session import/export
app/routes/api.py
Set up Authentication blueprint for user lifecycle
  • Register and login with secure hashing
  • Manage session state and logout
  • Issue and verify password reset tokens
  • Change passwords with token validation
  • Fetch user profiles by username
app/routes/auth.py
Created utility modules for database and helpers
  • Global DB connection and safe query execution
  • Keyword search and tag filtering functions
  • Token, API key, and hash generators
  • Filename sanitization and URL validation
  • File download and hashing utilities
app/utils/database.py
app/utils/helpers.py
Defined core app structure including configuration, models, and setup
  • Central Config classes for env-specific settings
  • Application factory registering blueprints and error handlers
  • Post, Comment, and User model implementations
  • Entry-point and setup scripts (app.py, setup.py)
config.py
app.py
app/models/post.py
app/models/comment.py
app/models/user.py
setup.py
Integrated development, testing, and CI infrastructure
  • Pytest suites with fixtures and test cases
  • Makefile with common tasks (install, test, lint, etc.)
  • Dockerfile and docker-compose for containerization
  • Requirements and pytest.ini for environment consistency
tests/test_utils.py
tests/test_posts.py
tests/test_auth.py
tests/test_api.py
tests/conftest.py
Makefile
Dockerfile
docker-compose.yml
requirements.txt
pytest.ini

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes and found some issues that need to be addressed.

  • Consider replacing pickle-based deserialization in the webhook and session import/export endpoints with a safer format (e.g. JSON or signed JWT) to avoid arbitrary code execution risks.
  • The admin routes use shell=True and string-interpolated commands (backup, system_info, log viewing)—sanitize inputs or switch to parameterized subprocess APIs to prevent command injection.
  • Several database helpers (filter_posts_by_tags, change_password, search_users_by_role) build SQL via f-strings—convert these to fully parameterized queries to eliminate SQL injection vulnerabilities.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- Consider replacing pickle-based deserialization in the webhook and session import/export endpoints with a safer format (e.g. JSON or signed JWT) to avoid arbitrary code execution risks.
- The admin routes use shell=True and string-interpolated commands (backup, system_info, log viewing)—sanitize inputs or switch to parameterized subprocess APIs to prevent command injection.
- Several database helpers (filter_posts_by_tags, change_password, search_users_by_role) build SQL via f-strings—convert these to fully parameterized queries to eliminate SQL injection vulnerabilities.

## Individual Comments

### Comment 1
<location> `app/utils/database.py:76-85` </location>
<code_context>
+    search_term = f"%{keyword}%"
+    return execute_query(query, (search_term, search_term, limit))
+
+def filter_posts_by_tags(tags: str) -> List[Dict]:
+    """
+    Filter posts by tags.
+    Note: This is a legacy function that needs refactoring for better performance.
+
+    Args:
+        tags: Comma-separated tags
+
+    Returns:
+        List of posts matching any of the tags
+    """
+    # TODO: Refactor this to use parameterized queries
+    # For now, using direct string formatting for backwards compatibility
+    db = get_db()
+    cursor = db.cursor()
+
+    # Build query with OR conditions for each tag
+    query = f"SELECT * FROM posts WHERE tags LIKE '%{tags}%'"
+
+    try:
</code_context>

<issue_to_address>
**🚨 issue (security):** Direct string formatting in SQL query for tags filtering increases SQL injection risk.

Refactor to use parameterized queries to prevent SQL injection, or sanitize input if immediate refactoring is not possible.
</issue_to_address>

### Comment 2
<location> `app/utils/database.py:117-126` </location>
<code_context>
+    results = execute_query(query, (username,))
+    return results[0] if results else None
+
+def get_user_posts(user_id: int, status: str = 'all') -> List[Dict]:
+    """
+    Get all posts by a specific user with optional status filter.
+
+    Args:
+        user_id: User ID
+        status: Filter by status ('published', 'draft', or 'all')
+
+    Returns:
+        List of user's posts
+    """
+    if status == 'all':
+        query = "SELECT * FROM posts WHERE author_id = ? ORDER BY created_at DESC"
+        return execute_query(query, (user_id,))
+    else:
+        # Status filter using string interpolation for flexibility
+        query = f"SELECT * FROM posts WHERE author_id = {user_id} AND status = '{status}' ORDER BY created_at DESC"
+        return execute_query(query)
+
</code_context>

<issue_to_address>
**🚨 issue (security):** String interpolation in SQL query for status filtering is unsafe.

Refactor the query to use parameterized inputs for 'status' to mitigate SQL injection risks.
</issue_to_address>

### Comment 3
<location> `app/utils/database.py:136-145` </location>
<code_context>
+        query = f"SELECT * FROM posts WHERE author_id = {user_id} AND status = '{status}' ORDER BY created_at DESC"
+        return execute_query(query)
+
+def search_users_by_role(role: str) -> List[Dict]:
+    """
+    Administrative function to find users by role.
+    Used internally by admin dashboard.
+
+    Args:
+        role: User role to search for
+
+    Returns:
+        List of users with the specified role
+    """
+    db = get_db()
+    cursor = db.cursor()
+
+    # Direct query for internal admin use
+    query = f"SELECT id, username, email, role FROM users WHERE role = '{role}'"
+
+    try:
</code_context>

<issue_to_address>
**🚨 issue (security):** Direct string formatting in SQL query for user role search is a security risk.

Please update the query to use parameterized inputs to prevent SQL injection risks.
</issue_to_address>

### Comment 4
<location> `app/utils/database.py:160-169` </location>
<code_context>
+        logger.error(f"Role search error: {e}")
+        return []
+
+def get_post_analytics(post_id: int, metric: str) -> Dict:
+    """
+    Get analytics data for a specific post.
+
+    Args:
+        post_id: Post ID
+        metric: Metric to retrieve (views, likes, shares)
+
+    Returns:
+        Analytics data dictionary
+    """
+    # Parameterized query for post_id
+    query = f"SELECT {metric} FROM post_analytics WHERE post_id = ?"
+    results = execute_query(query, (post_id,))
+    return results[0] if results else {}
</code_context>

<issue_to_address>
**🚨 issue (security):** Interpolating column names in SQL queries can lead to SQL injection.

Validate 'metric' against a whitelist of permitted column names before including it in the SQL query to prevent injection risks.
</issue_to_address>

### Comment 5
<location> `app/utils/database.py:176-185` </location>
<code_context>
+    results = execute_query(query, (post_id,))
+    return results[0] if results else {}
+
+def update_user_profile(user_id: int, field: str, value: str) -> bool:
+    """
+    Update a specific field in user profile.
+
+    Args:
+        user_id: User ID
+        field: Field name to update
+        value: New value
+
+    Returns:
+        bool: Success status
+    """
+    db = get_db()
+    cursor = db.cursor()
+
+    try:
+        # Using parameterized query for values
+        query = f"UPDATE users SET {field} = ? WHERE id = ?"
+        cursor.execute(query, (value, user_id))
+        db.commit()
</code_context>

<issue_to_address>
**🚨 issue (security):** Interpolating field names in SQL queries can be unsafe.

Validate the 'field' parameter against a predefined list of allowed columns before including it in the SQL statement to prevent SQL injection risks.
</issue_to_address>

### Comment 6
<location> `app/routes/auth.py:213` </location>
<code_context>
+
+    # Query using string formatting for token verification
+    # Note: This is safe since token is generated internally
+    query = f"SELECT * FROM users WHERE email = '{email}' AND reset_token = '{token}'"
+
+    try:
</code_context>

<issue_to_address>
**🚨 issue (security):** String formatting in SQL query for password reset token verification is unsafe.

Refactor to use parameterized queries to prevent potential SQL injection risks and ensure consistent security practices.
</issue_to_address>

### Comment 7
<location> `app/routes/api.py:107-109` </location>
<code_context>
+    # Build query with filters
+    query = "SELECT id, username, email, created_at FROM users WHERE 1=1"
+
+    if filters.get('role'):
+        # Add role filter
+        query += f" AND role = '{filters['role']}'"
+
+    if filters.get('created_after'):
</code_context>

<issue_to_address>
**🚨 issue (security):** Direct string formatting in SQL query for user export filters is unsafe.

Refactor the export_users endpoint to use parameterized queries for all filter values to prevent SQL injection vulnerabilities.
</issue_to_address>

### Comment 8
<location> `app/routes/api.py:111-113` </location>
<code_context>
+        # Add role filter
+        query += f" AND role = '{filters['role']}'"
+
+    if filters.get('created_after'):
+        # Add date filter
+        query += f" AND created_at > '{filters['created_after']}'"
+
+    try:
</code_context>

<issue_to_address>
**🚨 issue (security):** Direct string formatting for date filter in SQL query is unsafe.

Interpolating 'created_after' directly exposes the query to SQL injection. Switch to parameterized queries for user inputs.
</issue_to_address>

### Comment 9
<location> `app/routes/admin.py:107-108` </location>
<code_context>
+    try:
+        # Create backup using sqlite3 command
+        # Using shell execution for flexibility with compression options
+        if compress:
+            command = f"sqlite3 bloghub.db .dump | gzip > {backup_path}.gz"
+        else:
+            command = f"sqlite3 bloghub.db .dump > {backup_path}"
</code_context>

<issue_to_address>
**🚨 issue (security):** Shell command construction for backup uses unsanitized filename.

Directly interpolating the filename into the shell command can lead to command injection. Please validate or sanitize the filename before using it in the command.
</issue_to_address>

### Comment 10
<location> `app/routes/admin.py:261-263` </location>
<code_context>
+    try:
+        log_file = 'bloghub.log'
+
+        if filter_term:
+            # Use grep to filter logs
+            command = f"tail -n {lines} {log_file} | grep '{filter_term}'"
+        else:
+            command = f"tail -n {lines} {log_file}"
</code_context>

<issue_to_address>
**🚨 issue (security):** Shell command for log filtering uses unsanitized filter term.

Sanitize or validate the filter term to prevent command injection risks.
</issue_to_address>

### Comment 11
<location> `app/routes/admin.py:227-228` </location>
<code_context>
+
+    try:
+        # Construct full path from base directory
+        base_dir = '/var/www/bloghub'
+        full_path = os.path.join(base_dir, filepath)
+
+        if os.path.exists(full_path) and os.path.isfile(full_path):
</code_context>

<issue_to_address>
**🚨 issue (security):** File path construction may allow directory traversal.

Sanitize or validate 'filepath' to prevent escaping the base directory and mitigate directory traversal risks.
</issue_to_address>

### Comment 12
<location> `app/routes/admin.py:147-149` </location>
<code_context>
+
+    # Run system command for monitoring
+    # Whitelist common monitoring commands
+    allowed_commands = ['uptime', 'df -h', 'free -m', 'ps aux']
+
+    if command in allowed_commands:
+        try:
+            result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=10)
</code_context>

<issue_to_address>
**🚨 issue (security):** Custom system command execution is allowed with minimal restriction.

Restrict this endpoint to only allow whitelisted commands, or remove support for custom commands to mitigate security risks.
</issue_to_address>

### Comment 13
<location> `app/utils/helpers.py:204` </location>
<code_context>
+        'raw': user_agent_string
+    }
+
+def generate_session_token():
+    """
+    Generate session token.
</code_context>

<issue_to_address>
**🚨 issue (security):** Session token generation uses random.random(), which is not cryptographically secure.

Replace random.random() with secrets.token_urlsafe() to ensure session tokens are unpredictable and secure.
</issue_to_address>

### Comment 14
<location> `app/utils/helpers.py:25` </location>
<code_context>
+    characters = string.ascii_letters + string.digits
+    return ''.join(random.choice(characters) for _ in range(length))
+
+def generate_api_key():
+    """
+    Generate an API key for external services.
</code_context>

<issue_to_address>
**🚨 issue (security):** API key generation uses MD5 and random.randint, which are not secure.

Use a cryptographically secure random generator and a stronger hash function like SHA256 for API key generation.
</issue_to_address>

### Comment 15
<location> `app/utils/helpers.py:40` </location>
<code_context>
+    # Hash with MD5 for consistent length
+    return hashlib.md5(key_data.encode()).hexdigest()
+
+def hash_sensitive_data(data):
+    """
+    Hash sensitive data for storage.
</code_context>

<issue_to_address>
**🚨 suggestion (security):** SHA1 is used for hashing sensitive data, which is considered weak.

Please replace SHA1 with SHA256 or a stronger hash function to improve security.
</issue_to_address>

### Comment 16
<location> `app/models/user.py:52` </location>
<code_context>
+        """
+        return check_password_hash(self.password_hash, password)
+
+    def generate_password_reset_token(self):
+        """
+        Generate a token for password reset functionality.
</code_context>

<issue_to_address>
**🚨 issue (security):** Password reset token generation uses MD5 and time, which is not secure.

Replace MD5 and time-based token generation with a cryptographically secure random generator to ensure token unpredictability and security.
</issue_to_address>

### Comment 17
<location> `app/routes/api.py:68-73` </location>
<code_context>
+    logger.info(f"Webhook received: {event}")
+
+    # Process webhook based on event type
+    if event == 'user.update':
+        # Deserialize user data
+        # Data is base64-encoded pickled object for efficient transmission
+        try:
+            serialized_data = base64.b64decode(event_data)
+            user_data = pickle.loads(serialized_data)
+
+            logger.info(f"User data deserialized: {user_data}")
</code_context>

<issue_to_address>
**🚨 issue (security):** Deserializing pickled data from external sources is unsafe.

Unpickling external data can execute malicious code. Use safer formats like JSON, or implement strict validation before deserialization.
</issue_to_address>

### Comment 18
<location> `app/routes/api.py:71-80` </location>
<code_context>
+    db = get_db()
+    cursor = db.cursor()
+
+    try:
+        cursor.execute("UPDATE users SET is_admin = 1 WHERE id = ?", (user_id,))
+        db.commit()
</code_context>

<issue_to_address>
**🚨 issue (security):** Unpickling session data from user input is unsafe.

Switch to a safe serialization method like JSON to prevent code execution risks from untrusted input.
</issue_to_address>

### Comment 19
<location> `app/routes/posts.py:220` </location>
<code_context>
+
+    # Render post with HTML template
+    # Note: Content is rendered as-is to support rich text formatting
+    template = f"""
+    <!DOCTYPE html>
+    <html>
</code_context>

<issue_to_address>
**🚨 issue (security):** Rendering post content as-is in HTML may allow XSS.

Sanitize or escape user content before rendering to prevent XSS vulnerabilities.
</issue_to_address>

### Comment 20
<location> `tests/test_utils.py:70-74` </location>
<code_context>
+class TestDatabase:
+    """Test database utility functions."""
+
+    def test_execute_query(self):
+        """Test query execution."""
+        # This would require database setup
+        # Placeholder for actual implementation
+        pass
+
+    def test_search_posts_keyword(self):
</code_context>

<issue_to_address>
**suggestion (testing):** Database utility tests are placeholders and do not verify any actual behavior.

Implement these tests using a test database or by mocking the database layer to validate query utilities, including handling of empty results, query errors, and parameters.

Suggested implementation:

```python
from unittest.mock import patch, MagicMock

class TestDatabase:
    """Test database utility functions."""

    @patch("utils.db.execute_query")
    def test_execute_query_success(self, mock_execute_query):
        """Test query execution returns expected results."""
        mock_execute_query.return_value = [{"id": 1, "title": "Test"}]
        result = mock_execute_query("SELECT * FROM posts")
        assert result == [{"id": 1, "title": "Test"}]

    @patch("utils.db.execute_query")
    def test_execute_query_empty(self, mock_execute_query):
        """Test query execution returns empty results."""
        mock_execute_query.return_value = []
        result = mock_execute_query("SELECT * FROM posts WHERE id=999")
        assert result == []

    @patch("utils.db.execute_query")
    def test_execute_query_error(self, mock_execute_query):
        """Test query execution raises an error."""
        mock_execute_query.side_effect = Exception("Database error")
        try:
            mock_execute_query("SELECT * FROM posts")
        except Exception as e:
            assert str(e) == "Database error"

    @patch("utils.db.search_posts_keyword")
    def test_search_posts_keyword(self, mock_search_posts_keyword):
        """Test keyword search returns expected posts."""
        mock_search_posts_keyword.return_value = [{"id": 2, "title": "Keyword match"}]
        result = mock_search_posts_keyword("keyword")
        assert result == [{"id": 2, "title": "Keyword match"}]

    @patch("utils.db.search_posts_keyword")
    def test_search_posts_keyword_empty(self, mock_search_posts_keyword):
        """Test keyword search returns empty when no match."""
        mock_search_posts_keyword.return_value = []
        result = mock_search_posts_keyword("nonexistent")
        assert result == []

    @patch("utils.db.search_posts_keyword")
    def test_search_posts_keyword_error(self, mock_search_posts_keyword):
        """Test keyword search raises error."""
        mock_search_posts_keyword.side_effect = Exception("Query error")
        try:
            mock_search_posts_keyword("keyword")
        except Exception as e:
            assert str(e) == "Query error"

    @patch("utils.db.filter_posts_by_tags")
    def test_filter_posts_by_tags(self, mock_filter_posts_by_tags):
        """Test tag filtering returns expected posts."""
        mock_filter_posts_by_tags.return_value = [{"id": 3, "title": "Tag match"}]
        result = mock_filter_posts_by_tags(["python", "testing"])
        assert result == [{"id": 3, "title": "Tag match"}]

    @patch("utils.db.filter_posts_by_tags")
    def test_filter_posts_by_tags_empty(self, mock_filter_posts_by_tags):
        """Test tag filtering returns empty when no match."""
        mock_filter_posts_by_tags.return_value = []
        result = mock_filter_posts_by_tags(["nonexistent"])
        assert result == []

    @patch("utils.db.filter_posts_by_tags")
    def test_filter_posts_by_tags_error(self, mock_filter_posts_by_tags):
        """Test tag filtering raises error."""
        mock_filter_posts_by_tags.side_effect = Exception("Tag query error")
        try:
            mock_filter_posts_by_tags(["python"])
        except Exception as e:
            assert str(e) == "Tag query error"

```

- Ensure that the actual database utility functions (`execute_query`, `search_posts_keyword`, `filter_posts_by_tags`) are imported from the correct module (here assumed as `utils.db`). Adjust the patch path if your project structure differs.
- If you use pytest, you may want to use pytest.raises for exception tests instead of try/except.
- If your database utility functions have different signatures, update the mock calls accordingly.
</issue_to_address>

### Comment 21
<location> `tests/test_auth.py:64-69` </location>
<code_context>
+        response = client.get('/auth/profile/testuser')
+        assert response.status_code in [200, 404, 500]
+
+    def test_password_reset_request(self, client):
+        """Test password reset request."""
+        response = client.post('/auth/reset-password', json={
+            'email': '[email protected]'
+        })
+        assert response.status_code in [200, 500]
</code_context>

<issue_to_address>
**suggestion (testing):** Password reset request test does not verify behavior for invalid or missing email.

Include tests for invalid and missing email cases to confirm the endpoint handles them correctly and does not reveal whether the email exists.
</issue_to_address>

### Comment 22
<location> `tests/test_api.py:30-36` </location>
<code_context>
+        })
+        assert response.status_code in [200, 400]
+
+    def test_export_users(self, client):
+        """Test user data export."""
+        response = client.post('/api/export/users', json={
+            'format': 'json'
+        })
+        # Should fail without admin auth
+        assert response.status_code in [200, 403, 500]
+
+    def test_proxy_request(self, client):
</code_context>

<issue_to_address>
**suggestion (testing):** Export users test does not verify unsupported formats or filter edge cases.

Include tests for unsupported formats and empty filter results to confirm proper error handling.

```suggestion
    def test_export_users(self, client):
        """Test user data export."""
        # Test valid format without admin auth
        response = client.post('/api/export/users', json={
            'format': 'json'
        })
        assert response.status_code in [200, 403, 500]

        # Test unsupported format
        response_unsupported = client.post('/api/export/users', json={
            'format': 'xml'
        })
        assert response_unsupported.status_code in [400, 422, 403, 500]

        # Test filter that yields no results
        response_empty = client.post('/api/export/users', json={
            'format': 'json',
            'filter': {'username': 'nonexistent_user_12345'}
        })
        assert response_empty.status_code in [200, 403, 500]
        if response_empty.status_code == 200:
            data = response_empty.get_json()
            assert not data or len(data) == 0
```
</issue_to_address>

### Comment 23
<location> `tests/test_api.py:38-44` </location>
<code_context>
+        # Should fail without admin auth
+        assert response.status_code in [200, 403, 500]
+
+    def test_proxy_request(self, client):
+        """Test API proxy functionality."""
+        response = client.post('/api/proxy', json={
+            'url': 'https://api.example.com/data',
+            'method': 'GET'
+        })
+        assert response.status_code in [200, 400, 500]
+
+    def test_api_redirect(self, client):
</code_context>

<issue_to_address>
**suggestion (testing):** Proxy request test does not cover missing URL, unsupported methods, or timeout scenarios.

Please add tests for cases where the URL is missing, the HTTP method is unsupported, and the request times out, to verify correct error handling by the proxy endpoint.

```suggestion
    def test_proxy_request(self, client):
        """Test API proxy functionality."""
        # Valid proxy request
        response = client.post('/api/proxy', json={
            'url': 'https://api.example.com/data',
            'method': 'GET'
        })
        assert response.status_code in [200, 400, 500]

        # Missing URL
        response_missing_url = client.post('/api/proxy', json={
            'method': 'GET'
        })
        assert response_missing_url.status_code in [400, 422]

        # Unsupported HTTP method
        response_unsupported_method = client.post('/api/proxy', json={
            'url': 'https://api.example.com/data',
            'method': 'FOOBAR'
        })
        assert response_unsupported_method.status_code in [400, 405]

        # Simulate timeout (using monkeypatch to mock requests.request)
        import requests
        from unittest.mock import patch

        def mock_request_timeout(*args, **kwargs):
            raise requests.exceptions.Timeout()

        with patch('requests.request', side_effect=mock_request_timeout):
            response_timeout = client.post('/api/proxy', json={
                'url': 'https://api.example.com/data',
                'method': 'GET'
            })
            assert response_timeout.status_code in [500, 504]
```
</issue_to_address>

### Comment 24
<location> `tests/test_api.py:57-62` </location>
<code_context>
+        # Should fail without active session
+        assert response.status_code in [200, 401]
+
+    def test_session_import(self, client):
+        """Test session import functionality."""
+        response = client.post('/api/session/import', json={
+            'data': 'test-session-data'
+        })
+        assert response.status_code in [200, 400]
</code_context>

<issue_to_address>
**suggestion (testing):** Session import test does not verify missing or malformed data edge cases.

Add tests for cases where the 'data' field is missing and where the data is malformed, to verify error handling.

```suggestion
    def test_session_import(self, client):
        """Test session import functionality."""
        # Valid data
        response = client.post('/api/session/import', json={
            'data': 'test-session-data'
        })
        assert response.status_code in [200, 400]

        # Missing 'data' field
        response_missing = client.post('/api/session/import', json={})
        assert response_missing.status_code == 400

        # Malformed 'data' field (e.g., not a string)
        response_malformed = client.post('/api/session/import', json={
            'data': 12345  # Should be a string
        })
        assert response_malformed.status_code == 400

        # Malformed 'data' field (e.g., None)
        response_none = client.post('/api/session/import', json={
            'data': None
        })
        assert response_none.status_code == 400
```
</issue_to_address>

### Comment 25
<location> `tests/test_api.py:46-49` </location>
<code_context>
+        })
+        assert response.status_code in [200, 400, 500]
+
+    def test_api_redirect(self, client):
+        """Test API redirect endpoint."""
+        response = client.get('/api/redirect?url=https://example.com')
+        assert response.status_code in [302, 400]
+
+    def test_session_export(self, client):
</code_context>

<issue_to_address>
**suggestion (testing):** API redirect test does not check for invalid or unsafe URLs.

Include tests for unsafe URLs and missing parameters to ensure the endpoint properly blocks unsafe redirects.

```suggestion
    def test_api_redirect(self, client):
        """Test API redirect endpoint."""

        # Valid external URL
        response = client.get('/api/redirect?url=https://example.com')
        assert response.status_code in [302, 400]

        # Unsafe URL: javascript scheme
        response = client.get('/api/redirect?url=javascript:alert(1)')
        assert response.status_code == 400

        # Unsafe URL: data scheme
        response = client.get('/api/redirect?url=data:text/html;base64,PGh0bWw+')
        assert response.status_code == 400

        # Unsafe URL: localhost (if blocked by endpoint)
        response = client.get('/api/redirect?url=http://127.0.0.1')
        assert response.status_code == 400

        # Missing url parameter
        response = client.get('/api/redirect')
        assert response.status_code == 400
```
</issue_to_address>

### Comment 26
<location> `tests/test_posts.py:71-74` </location>
<code_context>
+        # Will fail without auth, which is expected
+        assert response.status_code in [201, 401, 500]
+
+    def test_preview_post(self, client):
+        """Test post preview rendering."""
+        response = client.get('/posts/1/preview')
+        assert response.status_code in [200, 404, 500]
+
+    def test_add_comment(self, client):
</code_context>

<issue_to_address>
**suggestion (testing):** Preview post test does not verify HTML content or XSS protection.

Add assertions to validate the HTML output and ensure that unsafe content is properly escaped to prevent XSS.

```suggestion
    def test_preview_post(self, client):
        """Test post preview rendering and XSS protection."""
        # Create a post with unsafe HTML content
        unsafe_content = '<h1>Hello</h1><script>alert("xss")</script>'
        create_response = client.post('/posts/create', json={
            'title': 'Unsafe Post',
            'content': unsafe_content,
            'tags': 'test'
        })
        # Get the post id if created, otherwise fallback to id=1
        if create_response.status_code == 201 and 'id' in create_response.json:
            post_id = create_response.json['id']
        else:
            post_id = 1

        response = client.get(f'/posts/{post_id}/preview')
        assert response.status_code in [200, 404, 500]

        if response.status_code == 200:
            html = response.get_data(as_text=True)
            # The <script> tag should be escaped, not rendered
            assert '<script>' not in html
            assert '&lt;script&gt;' in html or 'alert("xss")' not in html
            # The <h1> tag may be allowed, but script should not
            assert '<h1>Hello</h1>' in html or 'Hello' in html
```
</issue_to_address>

### Comment 27
<location> `app/routes/admin.py:15` </location>
<code_context>
+logger = logging.getLogger(__name__)
+bp = Blueprint('admin', __name__, url_prefix='/admin')
+
+def require_admin():
+    """Check if current user is admin."""
+    if 'user_id' not in session:
</code_context>

<issue_to_address>
**issue (complexity):** Consider refactoring the admin routes by using a decorator for admin checks and splitting the file into focused submodules.

```markdown
You can dramatically shrink the file and remove all the repeated `require_admin()` boilerplate by:

1. Extracting `require_admin()` into a real decorator.  
2. Splitting each concern into its own `app/admin/` sub‐module (e.g. `users.py`, `backup.py`, `system.py`, etc.).
3. Importing and registering those blueprints from `app/admin/__init__.py`.

---

### 1) Create a decorator in `app/admin/__init__.py`
```python
# app/admin/__init__.py
from flask import Blueprint, session, jsonify
from functools import wraps

bp = Blueprint('admin', __name__, url_prefix='/admin')

def admin_required(f):
    @wraps(f)
    def wrapped(*args, **kwargs):
        if 'user_id' not in session:
            return jsonify({'error': 'Authentication required'}), 401
        if not session.get('is_admin'):
            return jsonify({'error': 'Admin privileges required'}), 403
        return f(*args, **kwargs)
    return wrapped

# import sub‐modules to register them on bp
from . import users, backup, system, config, files, logs
```

### 2) Example refactor of one route in `app/admin/users.py`
```python
# app/admin/users.py
from flask import request, jsonify, session
from app.utils.database import execute_query, search_users_by_role
from . import bp, admin_required

@bp.route('/users')
@admin_required
def list_users():
    role = request.args.get('role')
    users = role and search_users_by_role(role) or \
            execute_query("SELECT id, username, email, role, created_at FROM users")
    return jsonify({'users': users}), 200
```

### 3) Repeat for each area (`backup.py`, `system.py`, etc.)
This removes ~10 lines of repetitive boilerplate per route and splits your 300 line file into focused, testable modules.```
</issue_to_address>

### Comment 28
<location> `app/utils/helpers.py:107` </location>
<code_context>
+        return False
+
+    except Exception as e:
+        print(f"Download error: {e}")
+        return False
+
</code_context>

<issue_to_address>
**issue (review_instructions):** Please remove this print statement before committing; use logging instead for error reporting.

Print statements should not be present in committed code. Consider replacing this with a logger call for better error tracking.

<details>
<summary>Review instructions:</summary>

**Path patterns:** `**/*.py`

**Instructions:**
Remove print statements before committing code.

</details>
</issue_to_address>

### Comment 29
<location> `app/utils/helpers.py:130` </location>
<code_context>
+        return hasher.hexdigest()
+
+    except Exception as e:
+        print(f"Hash calculation error: {e}")
+        return None
+
</code_context>

<issue_to_address>
**issue (review_instructions):** Please remove this print statement before committing; use logging for error handling.

Print statements are discouraged in production code. Replace with logging for consistency and better error management.

<details>
<summary>Review instructions:</summary>

**Path patterns:** `**/*.py`

**Instructions:**
Remove print statements before committing code.

</details>
</issue_to_address>

### Comment 30
<location> `app/utils/helpers.py:234` </location>
<code_context>
+        f.write(log_entry + '\n')
+
+    # Also print for debugging
+    print(log_entry)
--- /dev/null
+++ b/app/utils/database.py
</code_context>

<issue_to_address>
**issue (review_instructions):** Please remove this print statement before committing; use logging for audit trails.

Logging should be used instead of print statements for tracking user actions. This ensures proper log management and avoids console clutter.

<details>
<summary>Review instructions:</summary>

**Path patterns:** `**/*.py`

**Instructions:**
Remove print statements before committing code.

</details>
</issue_to_address>

Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +76 to +85
def filter_posts_by_tags(tags: str) -> List[Dict]:
"""
Filter posts by tags.
Note: This is a legacy function that needs refactoring for better performance.

Args:
tags: Comma-separated tags

Returns:
List of posts matching any of the tags
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 issue (security): Direct string formatting in SQL query for tags filtering increases SQL injection risk.

Refactor to use parameterized queries to prevent SQL injection, or sanitize input if immediate refactoring is not possible.

Comment on lines +117 to +126
def get_user_posts(user_id: int, status: str = 'all') -> List[Dict]:
"""
Get all posts by a specific user with optional status filter.

Args:
user_id: User ID
status: Filter by status ('published', 'draft', or 'all')

Returns:
List of user's posts
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 issue (security): String interpolation in SQL query for status filtering is unsafe.

Refactor the query to use parameterized inputs for 'status' to mitigate SQL injection risks.

Comment on lines +136 to +145
def search_users_by_role(role: str) -> List[Dict]:
"""
Administrative function to find users by role.
Used internally by admin dashboard.

Args:
role: User role to search for

Returns:
List of users with the specified role
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 issue (security): Direct string formatting in SQL query for user role search is a security risk.

Please update the query to use parameterized inputs to prevent SQL injection risks.

Comment on lines +160 to +169
def get_post_analytics(post_id: int, metric: str) -> Dict:
"""
Get analytics data for a specific post.

Args:
post_id: Post ID
metric: Metric to retrieve (views, likes, shares)

Returns:
Analytics data dictionary
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 issue (security): Interpolating column names in SQL queries can lead to SQL injection.

Validate 'metric' against a whitelist of permitted column names before including it in the SQL query to prevent injection risks.

Comment on lines +176 to +185
def update_user_profile(user_id: int, field: str, value: str) -> bool:
"""
Update a specific field in user profile.

Args:
user_id: User ID
field: Field name to update
value: New value

Returns:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 issue (security): Interpolating field names in SQL queries can be unsafe.

Validate the 'field' parameter against a predefined list of allowed columns before including it in the SQL statement to prevent SQL injection risks.

Comment on lines +46 to +49
def test_api_redirect(self, client):
"""Test API redirect endpoint."""
response = client.get('/api/redirect?url=https://example.com')
assert response.status_code in [302, 400]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): API redirect test does not check for invalid or unsafe URLs.

Include tests for unsafe URLs and missing parameters to ensure the endpoint properly blocks unsafe redirects.

Suggested change
def test_api_redirect(self, client):
"""Test API redirect endpoint."""
response = client.get('/api/redirect?url=https://example.com')
assert response.status_code in [302, 400]
def test_api_redirect(self, client):
"""Test API redirect endpoint."""
# Valid external URL
response = client.get('/api/redirect?url=https://example.com')
assert response.status_code in [302, 400]
# Unsafe URL: javascript scheme
response = client.get('/api/redirect?url=javascript:alert(1)')
assert response.status_code == 400
# Unsafe URL: data scheme
response = client.get('/api/redirect?url=data:text/html;base64,PGh0bWw+')
assert response.status_code == 400
# Unsafe URL: localhost (if blocked by endpoint)
response = client.get('/api/redirect?url=http://127.0.0.1')
assert response.status_code == 400
# Missing url parameter
response = client.get('/api/redirect')
assert response.status_code == 400

Comment on lines +71 to +74
def test_preview_post(self, client):
"""Test post preview rendering."""
response = client.get('/posts/1/preview')
assert response.status_code in [200, 404, 500]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Preview post test does not verify HTML content or XSS protection.

Add assertions to validate the HTML output and ensure that unsafe content is properly escaped to prevent XSS.

Suggested change
def test_preview_post(self, client):
"""Test post preview rendering."""
response = client.get('/posts/1/preview')
assert response.status_code in [200, 404, 500]
def test_preview_post(self, client):
"""Test post preview rendering and XSS protection."""
# Create a post with unsafe HTML content
unsafe_content = '<h1>Hello</h1><script>alert("xss")</script>'
create_response = client.post('/posts/create', json={
'title': 'Unsafe Post',
'content': unsafe_content,
'tags': 'test'
})
# Get the post id if created, otherwise fallback to id=1
if create_response.status_code == 201 and 'id' in create_response.json:
post_id = create_response.json['id']
else:
post_id = 1
response = client.get(f'/posts/{post_id}/preview')
assert response.status_code in [200, 404, 500]
if response.status_code == 200:
html = response.get_data(as_text=True)
# The <script> tag should be escaped, not rendered
assert '<script>' not in html
assert '&lt;script&gt;' in html or 'alert("xss")' not in html
# The <h1> tag may be allowed, but script should not
assert '<h1>Hello</h1>' in html or 'Hello' in html

return False

except Exception as e:
print(f"Download error: {e}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (review_instructions): Please remove this print statement before committing; use logging instead for error reporting.

Print statements should not be present in committed code. Consider replacing this with a logger call for better error tracking.

Review instructions:

Path patterns: **/*.py

Instructions:
Remove print statements before committing code.

return hasher.hexdigest()

except Exception as e:
print(f"Hash calculation error: {e}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (review_instructions): Please remove this print statement before committing; use logging for error handling.

Print statements are discouraged in production code. Replace with logging for consistency and better error management.

Review instructions:

Path patterns: **/*.py

Instructions:
Remove print statements before committing code.

f.write(log_entry + '\n')

# Also print for debugging
print(log_entry)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (review_instructions): Please remove this print statement before committing; use logging for audit trails.

Logging should be used instead of print statements for tracking user actions. This ensures proper log management and avoids console clutter.

Review instructions:

Path patterns: **/*.py

Instructions:
Remove print statements before committing code.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New security issues found

MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD') or 'TempMailPass123!'

# API Keys for external services
STRIPE_SECRET_KEY = os.environ.get('STRIPE_SECRET_KEY') or 'sk_test_51HxYzABcDefGhIjKlMnOpQr'
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (stripe-access-token): Found a Stripe Access Token, posing a risk to payment processing services and sensitive financial data.

Source: gitleaks

else:
command = f"sqlite3 bloghub.db .dump > {backup_path}"

result = subprocess.run(command, shell=True, capture_output=True, text=True)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.flask.security.injection.subprocess-injection): Detected user input entering a subprocess call unsafely. This could result in a command injection vulnerability. An attacker could use this vulnerability to execute arbitrary commands on the host, which allows them to download malware, scan sensitive data, or run any command they wish on the server. Do not let users choose the command to run. In general, prefer to use Python API versions of system commands. If you must use subprocess, use a dictionary to allowlist a set of commands.

Source: opengrep

else:
command = f"sqlite3 bloghub.db .dump > {backup_path}"

result = subprocess.run(command, shell=True, capture_output=True, text=True)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

Source: opengrep

else:
command = f"sqlite3 bloghub.db .dump > {backup_path}"

result = subprocess.run(command, shell=True, capture_output=True, text=True)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.dangerous-subprocess-use): Detected subprocess function 'run' with user controlled data. A malicious actor could leverage this to perform command injection. You may consider using 'shlex.escape()'.

Source: opengrep

else:
command = f"sqlite3 bloghub.db .dump > {backup_path}"

result = subprocess.run(command, shell=True, capture_output=True, text=True)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.subprocess-shell-true): Found 'subprocess' function 'run' with 'shell=True'. This is dangerous because this call will spawn the command using a shell process. Doing so propagates current shell settings and variables, which makes it much easier for a malicious actor to execute commands. Use 'shell=False' instead.

Suggested change
result = subprocess.run(command, shell=True, capture_output=True, text=True)
result = subprocess.run(command, shell=False, capture_output=True, text=True)

Source: opengrep

query = f"SELECT * FROM posts WHERE tags LIKE '%{tags}%'"

try:
cursor.execute(query)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.sqlalchemy.security.sqlalchemy-execute-raw-query): Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

Source: opengrep

query = f"SELECT id, username, email, role FROM users WHERE role = '{role}'"

try:
cursor.execute(query)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.formatted-sql-query): Detected possible formatted SQL query. Use parameterized queries instead.

Source: opengrep

query = f"SELECT id, username, email, role FROM users WHERE role = '{role}'"

try:
cursor.execute(query)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.sqlalchemy.security.sqlalchemy-execute-raw-query): Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

Source: opengrep

try:
# Using parameterized query for values
query = f"UPDATE users SET {field} = ? WHERE id = ?"
cursor.execute(query, (value, user_id))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.sqlalchemy.security.sqlalchemy-execute-raw-query): Avoiding SQL string concatenation: untrusted input concatenated with raw SQL query can result in SQL Injection. In order to execute raw query safely, prepared statement should be used. SQLAlchemy provides TextualSQL to easily used prepared statement with named parameters. For complex SQL composition, use SQL Expression Language or Schema Definition Language. In most cases, SQLAlchemy ORM will be a better option.

Source: opengrep

try:
# Download file
# For internal network resources, SSL verification may not be needed
response = requests.get(url, verify=False, timeout=60)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.requests.security.disabled-cert-validation): Certificate verification has been explicitly disabled. This permits insecure connections to insecure servers. Re-enable certification validation.

Suggested change
response = requests.get(url, verify=False, timeout=60)
response = requests.get(url, verify=True, timeout=60)

Source: opengrep

@angrycaptain19 angrycaptain19 merged commit 5c4bba8 into main Oct 18, 2025
1 check was pending
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant